19.3 选择
选择模式(select)是从多个channel里随机选出可用的那个,编译器会将相关语句翻译成具体的函数调用。
test.go
package main import () func main() { c1, c2 := make(chan int), make(chan int, 2) select { case c1 ← 1: println(0x11) case ←c2: println(0x22) default: println(0xff) } }
反汇编:
go tool objdump -s “main.main” test TEXT main.main(SB) test.go test.go:6 0x2073 CALL runtime.makechan(SB) // make(c1) test.go:6 0x2096 CALL runtime.makechan(SB) // make(c2) test.go:8 0x20e2 CALL runtime.newselect(SB) // newselect test.go:9 0x2104 CALL runtime.selectsend(SB) // case c1 test.go:11 0x2153 CALL runtime.selectrecv(SB) // case c2 test.go:13 0x2189 CALL runtime.selectdefault(SB) // case default test.go:8 0x21c2 CALL runtime.selectgo(SB) // selectgo
完整的select对象由“header+[n]scase”组成,完全是C不定长结构体的风格。
select.go
type hselect struct { tcase uint16 // ncase 总数 ncase uint16 // ncase 初始化顺序 pollorder *uint16 // 乱序后的 scase 序号 lockorder **hchan // 按 scase channel 地址排序 scase [1]scase // scase 数组 } type scase struct { elem unsafe.Pointer // data element c *hchan // chan pc uintptr // return pc kind uint16 so uint16 // vararg of selected bool receivedp *bool // pointer to received bool (recv2) releasetime int64 }
初始化函数newselect除设置相关初始属性外,还将一次性分配的内存切分给相关字段。
select.go
func newselect(sel *hselect, selsize int64, size int32) { if selsize != int64(selectsize(uintptr(size))) { throw(“bad select size”) } sel.tcase = uint16(size) sel.ncase = 0 sel.lockorder = (**hchan)(add(unsafe.Pointer(&sel.scase), uintptr(size)*unsafe.Sizeof(hselect{}.scase[0]))) sel.pollorder = (*uint16)(add(unsafe.Pointer(sel.lockorder), uintptr(size)*unsafe.Sizeof(*hselect{}.lockorder))) } func selectsize(size uintptr) uintptr { selsize := unsafe.Sizeof(hselect{}) + (size-1)unsafe.Sizeof(hselect{}.scase[0]) + sizeunsafe.Sizeof(hselect{}.lockorder) + sizeunsafe.Sizeof(*hselect{}.pollorder) return round(selsize, _Int64Align) }
在处理好select对象后,还须初始化scase。过程并不复杂,依ncase确定位置,设置相关参数。依照case channel操作方式,可分为send、recv、default三种。
select.go
func selectsendImpl(sel *hselect, c *hchan, pc uintptr, elem unsafe.Pointer, so uintptr) { // 确定位置 i := sel.ncase sel.ncase = i + 1 // 获取 scase,初始化 cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0]))) cas.pc = pc cas.c = c cas.so = uint16(so) cas.kind = caseSend cas.elem = elem } func selectrecvImpl(sel *hselect, c *hchan, pc uintptr, elem unsafe.Pointer, …) { i := sel.ncase sel.ncase = i + 1 cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0]))) cas.pc = pc cas.c = c cas.so = uint16(so) cas.kind = caseRecv cas.elem = elem cas.receivedp = received } func selectdefaultImpl(sel *hselect, callerpc uintptr, so uintptr) { i := sel.ncase sel.ncase = i + 1 cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0]))) cas.pc = callerpc cas.c = nil // 注意,这个会影响后面 lockorder 排序 cas.so = uint16(so) cas.kind = caseDefault }
选择算法又是一个充斥goto跳转的超长大杂烩,精简掉无关代码后,耐心点慢慢看。
select.go
func selectgo(sel *hselect) { pc, offset := selectgoImpl(sel) *(*bool)(add(unsafe.Pointer(&sel), uintptr(offset))) = true setcallerpc(unsafe.Pointer(&sel), pc) } func selectgoImpl(sel *hselect) (uintptr, uint16) { // 为访问方便,将 scase 封装成 slice scaseslice := slice{unsafe.Pointer(&sel.scase), int(sel.ncase), int(sel.ncase)} scases := ([]scase)(unsafe.Pointer(&scaseslice)) // pollorder: 对 scases 序号洗牌,乱序 // lockorder: 按 channel 地址顺序排序 // 锁定全部 channel sellock(sel) loop: // ----------------------- // 1: 查找已准备好的 case // ----------------------- for i := 0; i < int(sel.ncase); i++ { // 从乱序的 pollorder 中获取,这就是 select 随机选择的关键 cas = &scases[pollorder[i]] c = cas.c switch cas.kind { case caseRecv: if c.dataqsiz > 0 { // 异步 if c.qcount > 0 { // 缓冲槽有数据 goto asyncrecv } } else { // 同步 sg = c.sendq.dequeue() if sg != nil { // 有发送者 goto syncrecv } } if c.closed != 0 { // 关闭 goto rclose } case caseSend: if c.closed != 0 { goto sclose } if c.dataqsiz > 0 { if c.qcount < c.dataqsiz { goto asyncsend } } else { sg = c.recvq.dequeue() if sg != nil { goto syncsend } } case caseDefault: dfl = cas } } // 如果没有准备好的 case,尝试执行 default if dfl != nil { selunlock(sel) cas = dfl goto retc } // ----------------------------------------------------- // 2: 如果没有任何准备好的 case,将当前 select G 打包成 sudog, // 放到所有 channel 排队列表,等待唤醒 // ----------------------------------------------------- gp = getg() done = 0 for i := 0; i < int(sel.ncase); i++ { cas = &scases[pollorder[i]] c = cas.c // 打包成 sudog // 每个 case 的 sudog 都不同 sg := acquireSudog() sg.g = gp sg.selectdone = (*uint32)(noescape(unsafe.Pointer(&done))) sg.elem = cas.elem // 全部 sudog 被放入 gp.waiting 链表 // 此链表顺序同 pollorder,后面以此识别是哪个 case 唤醒的 sg.waitlink = gp.waiting gp.waiting = sg // 根据 case 类型,决定放入发送或接收者排队列表 switch cas.kind { case caseRecv: c.recvq.enqueue(sg) case caseSend: c.sendq.enqueue(sg) } } // 休眠 select G,直到某个 case channel 活动后,从排队列表将其提取并唤醒 gp.param = nil gopark(selparkcommit, unsafe.Pointer(sel), “select”, traceEvGoBlockSelect|futile, 2) // 被唤醒 sellock(sel) sg = (*sudog)(gp.param) // 注意唤醒参数,就是待查 sudog gp.param = nil // -------------------------------- // 3: 找出是哪个 case 唤醒 select G的 // -------------------------------- // 使用第二步准备好的 gp.waiting 链表 sglist = gp.waiting gp.waiting = nil for i := int(sel.ncase) - 1; i >= 0; i— { // 同样使用 pollorder,所以和 gp.waiting 顺序一致 k = &scases[pollorder[i]] if sg == sglist { // 匹配 cas = k } else { // 不匹配,将 sudog 从 channel 排队列表移除 c = k.c if k.kind == caseSend { c.sendq.dequeueSudoG(sglist) } else { c.recvq.dequeueSudoG(sglist) } } // 利用循环清理掉所有排队 sudog sgnext = sglist.waitlink sglist.waitlink = nil releaseSudog(sglist) sglist = sgnext } // 没找到匹配,可能被意外唤醒,重新开始 if cas == nil { goto loop } // 找到目标,解锁,退出 selunlock(sel) goto retc // 这些前面已分析过,略过 asyncrecv: asyncsend: syncrecv: rclose: syncsend: retc: return cas.pc, cas.so sclose: selunlock(sel) panic(“send on closed channel”) }
对这种代码风格,真是无语了。就算原本是C也没必要写成这样吧,什么时候才能整理干净?
简化后的流程看上去就清爽多了。
1.用pollorder“随机”遍历,找出准备好的case。
2.如没有可用case,则尝试default case。
3.如都不可用,则将selectG打包放入所有channel的排队列表。
4.直到select G被某个channel唤醒,遍历ncase查找目标case。
每次操作,都需要对全部channel加锁,这种粒度似乎太大了些。
select.go
func sellock(sel *hselect) { lockslice := slice{unsafe.Pointer(sel.lockorder), int(sel.ncase), int(sel.ncase)} lockorder := ([]*hchan)(unsafe.Pointer(&lockslice)) var c *hchan for _, c0 := range lockorder { // 如果和前一 channel 地址不同,则加锁 // lockorder 的作用就是避免对同一 channel 重复加锁 if c0 != nil && c0 != c { c = c0 lock(&c.lock) } } } func selunlock(sel *hselect) { n := int(sel.ncase) r := 0 lockslice := slice{unsafe.Pointer(sel.lockorder), n, n} lockorder := ([]*hchan)(unsafe.Pointer(&lockslice)) // 因为 default case 的 channel = nil,所以总是排在 lockorder[0],跳过 if n > 0 && lockorder[0] == nil { r = 1 } for i := n - 1; i >= r; i— { c := lockorder[i] // 避免重复解锁 if i > 0 && c == lockorder[i-1] { continue } unlock(&c.lock) } }
官方已确定要改进select lock,只是release时间未定。